Skip to content

OpenManus 实现细节

深入源码,理解关键功能的实现原理

本章概览

  • LLM 调用封装:Token 管理与重试机制
  • 浏览器自动化实现:browser-use 集成
  • MCP 协议实现:连接外部服务
  • 代码执行沙箱:安全隔离执行
  • PlanningFlow:任务规划与执行

1. LLM 调用封装

1.1 单例模式设计

OpenManus 使用单例模式管理 LLM 实例,相同配置复用同一实例:

python
# app/llm.py
class LLM:
    _instances: Dict[str, "LLM"] = {}  # 实例缓存

    def __new__(cls, config_name: str = "default", llm_config: Optional[LLMSettings] = None):
        # 检查缓存
        if config_name not in cls._instances:
            instance = super().__new__(cls)
            instance.__init__(config_name, llm_config)
            cls._instances[config_name] = instance
        return cls._instances[config_name]

    def __init__(self, config_name: str = "default", llm_config: Optional[LLMSettings] = None):
        # 避免重复初始化
        if hasattr(self, "client"):
            return

        # 加载配置
        llm_config = llm_config or config.llm
        llm_config = llm_config.get(config_name, llm_config["default"])

        self.model = llm_config.model
        self.max_tokens = llm_config.max_tokens
        self.temperature = llm_config.temperature

        # 创建对应的客户端
        if self.api_type == "azure":
            self.client = AsyncAzureOpenAI(
                base_url=self.base_url,
                api_key=self.api_key,
                api_version=self.api_version,
            )
        elif self.api_type == "aws":
            self.client = BedrockClient()
        else:
            self.client = AsyncOpenAI(
                api_key=self.api_key,
                base_url=self.base_url
            )

为什么使用单例?

                    没有单例                              使用单例
┌─────────────────────────────────┐    ┌─────────────────────────────────┐
│                                 │    │                                 │
│  Agent1 ──▶ LLM("default")     │    │  Agent1 ──┐                     │
│                     ↓           │    │           │                     │
│            新建 OpenAI 客户端    │    │           ▼                     │
│                                 │    │     LLM("default") ◀── 共享     │
│  Agent2 ──▶ LLM("default")     │    │           ▲                     │
│                     ↓           │    │           │                     │
│            又新建 OpenAI 客户端   │    │  Agent2 ──┘                     │
│                                 │    │                                 │
│  ❌ 资源浪费,连接开销大           │    │  ✅ 复用连接,减少开销           │
└─────────────────────────────────┘    └─────────────────────────────────┘

1.2 Token 计数器

精确计算 Token 用量对于控制成本和避免超限至关重要:

python
# app/llm.py
class TokenCounter:
    """Token 计数器"""

    # Token 常量
    BASE_MESSAGE_TOKENS = 4      # 每条消息的基础 token
    FORMAT_TOKENS = 2            # 格式化 token
    LOW_DETAIL_IMAGE_TOKENS = 85 # 低分辨率图片 token
    HIGH_DETAIL_TILE_TOKENS = 170 # 高分辨率图片每块 token

    def __init__(self, tokenizer):
        self.tokenizer = tokenizer  # tiktoken 编码器

    def count_text(self, text: str) -> int:
        """计算文本的 token 数"""
        if not text:
            return 0
        return len(self.tokenizer.encode(text))

    def count_image(self, image_item: dict) -> int:
        """计算图片的 token 数(基于 OpenAI 规则)"""
        detail = image_item.get("detail", "medium")

        if detail == "low":
            return self.LOW_DETAIL_IMAGE_TOKENS  # 固定 85 token

        # 高分辨率图片计算
        if "dimensions" in image_item:
            width, height = image_item["dimensions"]
            return self._calculate_high_detail_tokens(width, height)

        # 默认估算
        return 1024 if detail != "high" else self._calculate_high_detail_tokens(1024, 1024)

    def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
        """计算高分辨率图片的 token 数"""
        MAX_SIZE = 2048
        TARGET_SHORT_SIDE = 768
        TILE_SIZE = 512

        # Step 1: 缩放到 2048x2048 内
        if width > MAX_SIZE or height > MAX_SIZE:
            scale = MAX_SIZE / max(width, height)
            width = int(width * scale)
            height = int(height * scale)

        # Step 2: 短边缩放到 768px
        scale = TARGET_SHORT_SIDE / min(width, height)
        scaled_width = int(width * scale)
        scaled_height = int(height * scale)

        # Step 3: 计算 512x512 tile 数量
        tiles_x = math.ceil(scaled_width / TILE_SIZE)
        tiles_y = math.ceil(scaled_height / TILE_SIZE)
        total_tiles = tiles_x * tiles_y

        # Step 4: 计算总 token
        return total_tiles * self.HIGH_DETAIL_TILE_TOKENS + self.LOW_DETAIL_IMAGE_TOKENS

    def count_message_tokens(self, messages: List[dict]) -> int:
        """计算消息列表的总 token 数"""
        total = self.FORMAT_TOKENS

        for message in messages:
            tokens = self.BASE_MESSAGE_TOKENS
            tokens += self.count_text(message.get("role", ""))
            tokens += self.count_content(message.get("content"))
            tokens += self.count_tool_calls(message.get("tool_calls", []))
            tokens += self.count_text(message.get("name", ""))
            tokens += self.count_text(message.get("tool_call_id", ""))
            total += tokens

        return total

1.3 重试机制

使用 tenacity 库实现指数退避重试:

python
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_random_exponential,
)

@retry(
    wait=wait_random_exponential(min=1, max=60),  # 随机指数退避:1-60秒
    stop=stop_after_attempt(6),                   # 最多重试 6 次
    retry=retry_if_exception_type((OpenAIError, Exception, ValueError)),
)
async def ask_tool(
    self,
    messages: List[Union[dict, Message]],
    tools: Optional[List[dict]] = None,
    tool_choice: TOOL_CHOICE_TYPE = ToolChoice.AUTO,
    **kwargs,
) -> ChatCompletionMessage:
    """工具调用请求(带重试)"""
    try:
        # 检查 token 限制
        input_tokens = self.count_message_tokens(messages)
        if not self.check_token_limit(input_tokens):
            raise TokenLimitExceeded(self.get_limit_error_message(input_tokens))

        # 发送请求
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=tools,
            tool_choice=tool_choice,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
            stream=False,  # 工具调用不使用流式
        )

        # 更新 token 计数
        self.update_token_count(
            response.usage.prompt_tokens,
            response.usage.completion_tokens
        )

        return response.choices[0].message

    except TokenLimitExceeded:
        raise  # 不重试 token 超限错误
    except OpenAIError as e:
        logger.error(f"OpenAI API error: {e}")
        raise

重试策略可视化

第1次失败 ──▶ 等待 1-2秒 ──▶ 第2次尝试
第2次失败 ──▶ 等待 2-4秒 ──▶ 第3次尝试
第3次失败 ──▶ 等待 4-8秒 ──▶ 第4次尝试
...
第6次失败 ──▶ 抛出异常

2. 浏览器自动化实现

2.1 browser-use 集成

OpenManus 基于 browser-use 库实现浏览器自动化:

python
# app/tool/browser_use_tool.py
from browser_use import Browser as BrowserUseBrowser
from browser_use import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig

class BrowserUseTool(BaseTool):
    name: str = "browser_use"
    description: str = "强大的浏览器自动化工具..."

    # 懒加载的浏览器实例
    browser: Optional[BrowserUseBrowser] = None
    context: Optional[BrowserContext] = None
    dom_service: Optional[DomService] = None

    # 并发控制锁
    lock: asyncio.Lock = Field(default_factory=asyncio.Lock)

2.2 浏览器初始化

python
async def _ensure_browser_initialized(self) -> BrowserContext:
    """确保浏览器已初始化(懒加载)"""
    if self.browser is None:
        # 配置浏览器选项
        browser_config_kwargs = {
            "headless": False,       # 非无头模式(可见)
            "disable_security": True  # 禁用安全限制
        }

        # 从配置加载代理设置
        if config.browser_config and config.browser_config.proxy:
            browser_config_kwargs["proxy"] = ProxySettings(
                server=config.browser_config.proxy.server,
                username=config.browser_config.proxy.username,
                password=config.browser_config.proxy.password,
            )

        # 创建浏览器实例
        self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs))

    if self.context is None:
        # 创建浏览器上下文
        self.context = await self.browser.new_context(BrowserContextConfig())
        # 初始化 DOM 服务
        self.dom_service = DomService(await self.context.get_current_page())

    return self.context

2.3 操作执行实现

python
async def execute(
    self,
    action: str,
    url: Optional[str] = None,
    index: Optional[int] = None,
    text: Optional[str] = None,
    **kwargs,
) -> ToolResult:
    """执行浏览器操作"""
    async with self.lock:  # 确保串行执行
        try:
            context = await self._ensure_browser_initialized()

            # 导航操作
            if action == "go_to_url":
                if not url:
                    return ToolResult(error="URL is required")
                page = await context.get_current_page()
                await page.goto(url)
                await page.wait_for_load_state()
                return ToolResult(output=f"Navigated to {url}")

            # 点击操作
            elif action == "click_element":
                if index is None:
                    return ToolResult(error="Index is required")
                element = await context.get_dom_element_by_index(index)
                if not element:
                    return ToolResult(error=f"Element {index} not found")
                await context._click_element_node(element)
                return ToolResult(output=f"Clicked element at index {index}")

            # 输入操作
            elif action == "input_text":
                if index is None or not text:
                    return ToolResult(error="Index and text required")
                element = await context.get_dom_element_by_index(index)
                await context._input_text_element_node(element, text)
                return ToolResult(output=f"Input '{text}' into element {index}")

            # 滚动操作
            elif action in ("scroll_down", "scroll_up"):
                direction = 1 if action == "scroll_down" else -1
                amount = kwargs.get("scroll_amount", 500)
                await context.execute_javascript(
                    f"window.scrollBy(0, {direction * amount});"
                )
                return ToolResult(output=f"Scrolled {action.split('_')[1]}")

            # 内容提取
            elif action == "extract_content":
                goal = kwargs.get("goal")
                if not goal:
                    return ToolResult(error="Goal is required")

                page = await context.get_current_page()
                import markdownify
                content = markdownify.markdownify(await page.content())

                # 使用 LLM 提取内容
                prompt = f"""
                提取目标: {goal}
                页面内容:
                {content[:2000]}
                """
                response = await self.llm.ask_tool(
                    messages=[{"role": "system", "content": prompt}],
                    tools=[extraction_function],
                    tool_choice="required",
                )
                # ... 解析响应 ...

            # ... 更多操作 ...

        except Exception as e:
            return ToolResult(error=f"Browser action '{action}' failed: {str(e)}")

2.4 获取浏览器状态

python
async def get_current_state(self) -> ToolResult:
    """获取当前浏览器状态(包含截图)"""
    try:
        ctx = self.context
        if not ctx:
            return ToolResult(error="Browser not initialized")

        state = await ctx.get_state()

        # 截图
        page = await ctx.get_current_page()
        await page.bring_to_front()
        await page.wait_for_load_state()

        screenshot = await page.screenshot(
            full_page=True,
            animations="disabled",
            type="jpeg",
            quality=100
        )
        screenshot_b64 = base64.b64encode(screenshot).decode("utf-8")

        # 构建状态信息
        state_info = {
            "url": state.url,
            "title": state.title,
            "tabs": [tab.model_dump() for tab in state.tabs],
            "interactive_elements": state.element_tree.clickable_elements_to_string(),
            "scroll_info": {
                "pixels_above": state.pixels_above,
                "pixels_below": state.pixels_below,
            },
        }

        return ToolResult(
            output=json.dumps(state_info, indent=4, ensure_ascii=False),
            base64_image=screenshot_b64,  # 包含截图
        )

    except Exception as e:
        return ToolResult(error=f"Failed to get state: {str(e)}")

3. MCP 协议实现

3.1 MCP 概述

Model Context Protocol (MCP) 是一种让 AI Agent 连接外部服务的标准协议:

┌────────────────────────────────────────────────────────┐
│                    OpenManus Agent                      │
│  ┌──────────────────────────────────────────────────┐  │
│  │                   MCPClients                      │  │
│  │                                                   │  │
│  │   Session 1        Session 2        Session 3    │  │
│  │      ↓                ↓                 ↓        │  │
│  │   ┌─────┐         ┌─────┐          ┌─────┐      │  │
│  │   │ SSE │         │STDIO│          │ SSE │      │  │
│  │   └──┬──┘         └──┬──┘          └──┬──┘      │  │
│  └──────┼───────────────┼───────────────┼──────────┘  │
└─────────┼───────────────┼───────────────┼──────────────┘
          ↓               ↓               ↓
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ 文件系统  │    │  数据库   │    │ 第三方API│
    │ MCP 服务 │    │ MCP 服务 │    │ MCP 服务 │
    └──────────┘    └──────────┘    └──────────┘

3.2 MCPClients 实现

python
# app/tool/mcp.py
from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client

class MCPClients(ToolCollection):
    """管理多个 MCP 服务器连接"""

    sessions: Dict[str, ClientSession] = {}
    exit_stacks: Dict[str, AsyncExitStack] = {}

    async def connect_sse(self, server_url: str, server_id: str = "") -> None:
        """通过 SSE (Server-Sent Events) 连接"""
        server_id = server_id or server_url

        # 清理已有连接
        if server_id in self.sessions:
            await self.disconnect(server_id)

        exit_stack = AsyncExitStack()
        self.exit_stacks[server_id] = exit_stack

        # 建立 SSE 连接
        streams_context = sse_client(url=server_url)
        streams = await exit_stack.enter_async_context(streams_context)
        session = await exit_stack.enter_async_context(ClientSession(*streams))
        self.sessions[server_id] = session

        # 初始化并获取工具列表
        await self._initialize_and_list_tools(server_id)

    async def connect_stdio(self, command: str, args: List[str], server_id: str = "") -> None:
        """通过标准输入输出连接(启动子进程)"""
        server_id = server_id or command

        if server_id in self.sessions:
            await self.disconnect(server_id)

        exit_stack = AsyncExitStack()
        self.exit_stacks[server_id] = exit_stack

        # 启动子进程
        server_params = StdioServerParameters(command=command, args=args)
        stdio_transport = await exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        read, write = stdio_transport
        session = await exit_stack.enter_async_context(ClientSession(read, write))
        self.sessions[server_id] = session

        await self._initialize_and_list_tools(server_id)

3.3 工具发现与注册

python
async def _initialize_and_list_tools(self, server_id: str) -> None:
    """初始化会话并注册工具"""
    session = self.sessions.get(server_id)
    if not session:
        raise RuntimeError(f"Session not found: {server_id}")

    # 初始化 MCP 会话
    await session.initialize()

    # 获取服务器提供的工具列表
    response = await session.list_tools()

    # 为每个工具创建代理对象
    for tool in response.tools:
        original_name = tool.name
        # 添加前缀避免命名冲突
        tool_name = self._sanitize_tool_name(f"mcp_{server_id}_{original_name}")

        # 创建工具代理
        server_tool = MCPClientTool(
            name=tool_name,
            description=tool.description,
            parameters=tool.inputSchema,
            session=session,
            server_id=server_id,
            original_name=original_name,
        )
        self.tool_map[tool_name] = server_tool

    self.tools = tuple(self.tool_map.values())
    logger.info(f"Connected to {server_id} with tools: {[t.name for t in response.tools]}")

3.4 MCPClientTool:工具代理

python
class MCPClientTool(BaseTool):
    """MCP 工具代理:将调用转发到远程服务"""

    session: Optional[ClientSession] = None
    server_id: str = ""
    original_name: str = ""

    async def execute(self, **kwargs) -> ToolResult:
        """执行工具(转发到 MCP 服务器)"""
        if not self.session:
            return ToolResult(error="Not connected to MCP server")

        try:
            # 调用远程工具
            result = await self.session.call_tool(self.original_name, kwargs)

            # 提取文本内容
            content_str = ", ".join(
                item.text for item in result.content
                if isinstance(item, TextContent)
            )
            return ToolResult(output=content_str or "No output")

        except Exception as e:
            return ToolResult(error=f"MCP tool error: {str(e)}")

4. 代码执行沙箱

4.1 多进程隔离

PythonExecute 使用多进程来隔离代码执行:

python
# app/tool/python_execute.py
import multiprocessing
from io import StringIO
import sys

class PythonExecute(BaseTool):
    name: str = "python_execute"
    description: str = "执行 Python 代码。注意:只有 print 输出可见"

    def _run_code(self, code: str, result_dict: dict, safe_globals: dict) -> None:
        """在子进程中执行代码"""
        original_stdout = sys.stdout
        try:
            # 重定向标准输出
            output_buffer = StringIO()
            sys.stdout = output_buffer

            # 执行代码
            exec(code, safe_globals, safe_globals)

            result_dict["observation"] = output_buffer.getvalue()
            result_dict["success"] = True
        except Exception as e:
            result_dict["observation"] = str(e)
            result_dict["success"] = False
        finally:
            sys.stdout = original_stdout

    async def execute(self, code: str, timeout: int = 5) -> Dict:
        """执行代码(带超时控制)"""
        with multiprocessing.Manager() as manager:
            # 使用 Manager 创建共享字典
            result = manager.dict({"observation": "", "success": False})

            # 准备安全的全局命名空间
            if isinstance(__builtins__, dict):
                safe_globals = {"__builtins__": __builtins__}
            else:
                safe_globals = {"__builtins__": __builtins__.__dict__.copy()}

            # 创建子进程
            proc = multiprocessing.Process(
                target=self._run_code,
                args=(code, result, safe_globals)
            )
            proc.start()
            proc.join(timeout)  # 等待执行,最多 timeout 秒

            # 检查是否超时
            if proc.is_alive():
                proc.terminate()
                proc.join(1)
                return {
                    "observation": f"Execution timeout after {timeout} seconds",
                    "success": False,
                }

            return dict(result)

执行隔离示意

┌─────────────────────────────────────────────┐
│              主进程 (OpenManus)              │
│                                             │
│  ┌─────────────────────────────────────┐   │
│  │          multiprocessing.Process    │   │
│  │  ┌─────────────────────────────┐   │   │
│  │  │       子进程(沙箱)         │   │   │
│  │  │                             │   │   │
│  │  │  exec(code, safe_globals)  │   │   │
│  │  │                             │   │   │
│  │  │  stdout ──▶ StringIO       │   │   │
│  │  │                             │   │   │
│  │  └─────────────────────────────┘   │   │
│  │              ↓                      │   │
│  │     Manager.dict (共享结果)          │   │
│  └─────────────────────────────────────┘   │
│                     ↓                       │
│              返回执行结果                    │
└─────────────────────────────────────────────┘

4.2 文件操作沙箱

StrReplaceEditor 支持本地和沙箱两种模式:

python
# app/tool/str_replace_editor.py
class StrReplaceEditor(BaseTool):
    _local_operator: LocalFileOperator = LocalFileOperator()
    _sandbox_operator: SandboxFileOperator = SandboxFileOperator()

    def _get_operator(self) -> FileOperator:
        """根据配置选择操作器"""
        return (
            self._sandbox_operator
            if config.sandbox.use_sandbox
            else self._local_operator
        )

    async def execute(
        self,
        *,
        command: Command,  # view, create, str_replace, insert, undo_edit
        path: str,
        **kwargs,
    ) -> str:
        operator = self._get_operator()

        # 验证路径
        await self.validate_path(command, Path(path), operator)

        if command == "view":
            return await self.view(path, kwargs.get("view_range"), operator)
        elif command == "create":
            await operator.write_file(path, kwargs["file_text"])
            self._file_history[path].append(kwargs["file_text"])
            return f"File created at: {path}"
        elif command == "str_replace":
            return await self.str_replace(
                path, kwargs["old_str"], kwargs.get("new_str"), operator
            )
        # ... 更多命令

5. PlanningFlow 实现

5.1 计划创建

python
# app/flow/planning.py
async def _create_initial_plan(self, request: str) -> None:
    """使用 LLM 创建初始计划"""
    system_content = (
        "你是一个规划助手。创建简洁、可执行的计划。"
        "关注关键里程碑而非细节步骤。"
    )

    # 如果有多个 Agent,添加选择提示
    if len(self.executor_keys) > 1:
        agents_description = [
            {"name": key.upper(), "description": self.agents[key].description}
            for key in self.executor_keys if key in self.agents
        ]
        system_content += f"\n可用 Agent: {json.dumps(agents_description)}"
        system_content += "\n创建步骤时使用 '[AGENT_NAME]' 格式指定执行者。"

    # 调用 LLM 创建计划
    response = await self.llm.ask_tool(
        messages=[Message.user_message(f"创建计划完成任务: {request}")],
        system_msgs=[Message.system_message(system_content)],
        tools=[self.planning_tool.to_param()],
        tool_choice=ToolChoice.AUTO,
    )

    # 处理工具调用
    if response.tool_calls:
        for tool_call in response.tool_calls:
            if tool_call.function.name == "planning":
                args = json.loads(tool_call.function.arguments)
                args["plan_id"] = self.active_plan_id
                await self.planning_tool.execute(**args)
                return

    # 如果 LLM 没有使用规划工具,创建默认计划
    await self.planning_tool.execute(
        command="create",
        plan_id=self.active_plan_id,
        title=f"Plan for: {request[:50]}",
        steps=["分析请求", "执行任务", "验证结果"],
    )

5.2 步骤执行

python
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
    """使用指定 Agent 执行当前步骤"""
    # 获取计划文本
    plan_status = await self._get_plan_text()
    step_text = step_info.get("text", f"Step {self.current_step_index}")

    # 构建步骤执行提示
    step_prompt = f"""
    当前计划状态:
    {plan_status}

    你的当前任务:
    正在执行步骤 {self.current_step_index}: "{step_text}"

    请只执行当前步骤,完成后提供摘要。
    """

    try:
        # 使用 Agent 执行步骤
        step_result = await executor.run(step_prompt)

        # 标记步骤完成
        await self._mark_step_completed()

        return step_result
    except Exception as e:
        logger.error(f"Step {self.current_step_index} failed: {e}")
        return f"Error: {str(e)}"

5.3 计划状态管理

python
class PlanStepStatus(str, Enum):
    NOT_STARTED = "not_started"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    BLOCKED = "blocked"

    @classmethod
    def get_status_marks(cls) -> Dict[str, str]:
        """状态标记符号"""
        return {
            cls.COMPLETED.value: "[✓]",
            cls.IN_PROGRESS.value: "[→]",
            cls.BLOCKED.value: "[!]",
            cls.NOT_STARTED.value: "[ ]",
        }

async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]:
    """获取当前待执行步骤"""
    plan_data = self.planning_tool.plans.get(self.active_plan_id)
    if not plan_data:
        return None, None

    steps = plan_data.get("steps", [])
    step_statuses = plan_data.get("step_statuses", [])

    # 找到第一个未完成的步骤
    for i, step in enumerate(steps):
        status = step_statuses[i] if i < len(step_statuses) else PlanStepStatus.NOT_STARTED.value

        if status in PlanStepStatus.get_active_statuses():
            step_info = {"text": step}

            # 提取步骤类型(如 [MANUS]、[DATA_ANALYSIS])
            import re
            type_match = re.search(r"\[([A-Z_]+)\]", step)
            if type_match:
                step_info["type"] = type_match.group(1).lower()

            # 标记为进行中
            await self.planning_tool.execute(
                command="mark_step",
                plan_id=self.active_plan_id,
                step_index=i,
                step_status=PlanStepStatus.IN_PROGRESS.value,
            )

            return i, step_info

    return None, None  # 所有步骤已完成

6. 防循环机制

6.1 检测重复响应

python
# app/agent/base.py
def is_stuck(self) -> bool:
    """检测 Agent 是否陷入循环"""
    if len(self.memory.messages) < 2:
        return False

    last_message = self.memory.messages[-1]
    if not last_message.content:
        return False

    # 统计相同内容出现的次数
    duplicate_count = sum(
        1
        for msg in reversed(self.memory.messages[:-1])
        if msg.role == "assistant" and msg.content == last_message.content
    )

    return duplicate_count >= self.duplicate_threshold  # 默认阈值为 2

6.2 处理卡住状态

python
def handle_stuck_state(self):
    """处理卡住状态:添加策略调整提示"""
    stuck_prompt = (
        "检测到重复响应。请考虑新策略,避免重复已尝试过的无效路径。"
    )
    self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
    logger.warning(f"Agent stuck detected. Added prompt: {stuck_prompt}")

防循环流程

┌─────────────────────────────────────────┐
│              执行步骤                    │
│                 ↓                        │
│          is_stuck() 检测                 │
│           /        \                     │
│         否          是                    │
│         ↓           ↓                    │
│      继续执行    handle_stuck_state()    │
│                     ↓                    │
│              修改提示词                   │
│                     ↓                    │
│              下一步尝试新策略             │
└─────────────────────────────────────────┘

7. 资源清理

7.1 Agent 清理

python
# app/agent/toolcall.py
async def cleanup(self):
    """清理 Agent 使用的资源"""
    logger.info(f"🧹 Cleaning up resources for '{self.name}'...")

    for tool_name, tool_instance in self.available_tools.tool_map.items():
        # 检查工具是否有 cleanup 方法
        if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(tool_instance.cleanup):
            try:
                await tool_instance.cleanup()
            except Exception as e:
                logger.error(f"Error cleaning up tool '{tool_name}': {e}")

    logger.info(f"✨ Cleanup complete for '{self.name}'.")

async def run(self, request: Optional[str] = None) -> str:
    """运行 Agent,完成后自动清理"""
    try:
        return await super().run(request)
    finally:
        await self.cleanup()

7.2 浏览器清理

python
# app/tool/browser_use_tool.py
async def cleanup(self):
    """清理浏览器资源"""
    async with self.lock:
        if self.context is not None:
            await self.context.close()
            self.context = None
            self.dom_service = None

        if self.browser is not None:
            await self.browser.close()
            self.browser = None

def __del__(self):
    """析构时确保清理"""
    if self.browser is not None or self.context is not None:
        try:
            asyncio.run(self.cleanup())
        except RuntimeError:
            # 如果没有事件循环,创建新的
            loop = asyncio.new_event_loop()
            loop.run_until_complete(self.cleanup())
            loop.close()

8. 实现细节总结

功能实现要点关键技术
LLM 调用单例模式、Token 计数、重试机制tenacity, tiktoken
浏览器自动化懒加载、并发锁、状态截图browser-use, Playwright
MCP 连接SSE/STDIO 双模式、工具代理mcp SDK, AsyncExitStack
代码执行多进程隔离、超时控制、输出捕获multiprocessing
任务规划步骤分解、状态跟踪、多 Agent 调度PlanningTool, 状态机
防循环重复检测、策略调整提示内容哈希比较
资源管理显式清理、析构保护async cleanup, del

下一章:22.5 使用指南 - 安装配置与实践操作

基于 MIT 许可证发布。内容版权归作者所有。